{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "数据管道.ipynb",
      "provenance": [],
      "toc_visible": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    }
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "z8QGRZ6gxmMN"
      },
      "source": [
        "# 数据管道 Dataset\n",
        "\n",
        "如果需要训练的数据大小不大，例如不到 1G，那么可以直接全部读入内存中进行训练，这样一般效率最高。但如果需要训练的数据很大，例如超过 10G，无法一次载入内存，那么通常需要在训练的过程中分批逐渐读入。使用 tf.data API 可以构建数据输入管道，轻松处理大量的数据，不同的数据格式，以及不同的数据转换。\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qTu9S_j-xs0X"
      },
      "source": [
        "# 管道构建\n",
        "\n",
        "可以从 Numpy array, Pandas DataFrame, Python generator, csv文件, 文本文件, 文件路径, tfrecords 文件等方式构建数据管道。其中通过 Numpy array, Pandas DataFrame, 文件路径构建数据管道是最常用的方法。\n",
        "\n",
        "通过 tfrecords 文件方式构建数据管道较为复杂，需要对样本构建 tf.Example 后压缩成字符串写到tfrecords 文件，读取后再解析成 tf.Example。但 tfrecords 文件的优点是压缩后文件较小，便于网络传播，加载速度较快。\n",
        "\n",
        "\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "auGepP1ZzIBn"
      },
      "source": [
        "## 从 Numpy array 构建数据管道"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "qVAK1QlIzMJl",
        "outputId": "f242771d-0839-42b0-a4fe-83214e6e927d",
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 102
        }
      },
      "source": [
        "# 从 Numpy array 构建数据管道\n",
        "from sklearn import datasets \n",
        "iris = datasets.load_iris()\n",
        "\n",
        "ds1 = tf.data.Dataset.from_tensor_slices((iris[\"data\"],iris[\"target\"]))\n",
        "for features,label in ds1.take(5):\n",
        "    print(features,label)"
      ],
      "execution_count": 1,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "tf.Tensor([5.1 3.5 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor([4.9 3.  1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor([4.7 3.2 1.3 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor([4.6 3.1 1.5 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor([5.  3.6 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "cy4Ttphh02D2"
      },
      "source": [
        "## 从 Pandas DataFrame 构建数据管道"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "p42dNn-K038F",
        "outputId": "aae3de11-676a-4af7-e8f9-6327b8835dd9",
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 88
        }
      },
      "source": [
        "from sklearn import datasets \n",
        "import pandas as pd\n",
        "iris = datasets.load_iris()\n",
        "dfiris = pd.DataFrame(iris[\"data\"],columns = iris.feature_names)\n",
        "ds2 = tf.data.Dataset.from_tensor_slices((dfiris.to_dict(\"list\"),iris[\"target\"]))\n",
        "\n",
        "for features,label in ds2.take(3):\n",
        "    print(features,label)"
      ],
      "execution_count": 2,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "{'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=5.1>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.5>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)\n",
            "{'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.9>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.0>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)\n",
            "{'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.7>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.2>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.3>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "cYhqFUrh2CWx"
      },
      "source": [
        "## 从 Python Generator 构建数据管道"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "FCYr5DTs2BiS"
      },
      "source": [
        "# 从Python generator构建数据管道\n",
        "import tensorflow as tf\n",
        "from matplotlib import pyplot as plt \n",
        "from tensorflow.keras.preprocessing.image import ImageDataGenerator\n",
        "\n",
        "# 定义一个从文件中读取图片的generator\n",
        "image_generator = ImageDataGenerator(rescale=1.0/255).flow_from_directory(\n",
        "                    \"./data/cifar2/test/\",\n",
        "                    target_size=(32, 32),\n",
        "                    batch_size=20,\n",
        "                    class_mode='binary')\n",
        "\n",
        "classdict = image_generator.class_indices\n",
        "print(classdict)\n",
        "\n",
        "def generator():\n",
        "    for features,label in image_generator:\n",
        "        yield (features,label)\n",
        "\n",
        "ds3 = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32))"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "OXdToofO2Iyd"
      },
      "source": [
        "## 从 CSV 文件构建数据管道"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "DWJmcyrR2Mkd"
      },
      "source": [
        "# 从csv文件构建数据管道\n",
        "ds4 = tf.data.experimental.make_csv_dataset(\n",
        "      file_pattern = [\"./data/titanic/train.csv\",\"./data/titanic/test.csv\"],\n",
        "      batch_size=3, \n",
        "      label_name=\"Survived\",\n",
        "      na_value=\"\",\n",
        "      num_epochs=1,\n",
        "      ignore_errors=True)\n",
        "\n",
        "for data,label in ds4.take(2):\n",
        "    print(data,label)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "XnVf_Ynt2N_8"
      },
      "source": [
        "## 从文本文件构建数据管道"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "mQITay9_2SUK"
      },
      "source": [
        "ds5 = tf.data.TextLineDataset(\n",
        "    filenames = [\"./data/titanic/train.csv\",\"./data/titanic/test.csv\"]\n",
        "    ).skip(1) #略去第一行header\n",
        "\n",
        "for line in ds5.take(5):\n",
        "    print(line)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8CzVp4LP2VGD"
      },
      "source": [
        "## 从文件路径构建数据管道"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "ueLujeJj2Wdt"
      },
      "source": [
        "ds6 = tf.data.Dataset.list_files(\"./data/cifar2/train/*/*.jpg\")\n",
        "for file in ds6.take(5):\n",
        "    print(file)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "PhjMiURu2ZA0"
      },
      "source": [
        "## 从 tfrecords 文件构建数据管道"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "5AW2tjzp2a5G"
      },
      "source": [
        "import os\n",
        "import numpy as np\n",
        "\n",
        "# inpath：原始数据路径 outpath:TFRecord文件输出路径\n",
        "def create_tfrecords(inpath,outpath): \n",
        "    writer = tf.io.TFRecordWriter(outpath)\n",
        "    dirs = os.listdir(inpath)\n",
        "    for index, name in enumerate(dirs):\n",
        "        class_path = inpath +\"/\"+ name+\"/\"\n",
        "        for img_name in os.listdir(class_path):\n",
        "            img_path = class_path + img_name\n",
        "            img = tf.io.read_file(img_path)\n",
        "            #img = tf.image.decode_image(img)\n",
        "            #img = tf.image.encode_jpeg(img) #统一成jpeg格式压缩\n",
        "            example = tf.train.Example(\n",
        "               features=tf.train.Features(feature={\n",
        "                    'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[index])),\n",
        "                    'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img.numpy()]))\n",
        "               }))\n",
        "            writer.write(example.SerializeToString())\n",
        "    writer.close()\n",
        "    \n",
        "create_tfrecords(\"./data/cifar2/test/\",\"./data/cifar2_test.tfrecords/\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_YroVYJo2edo"
      },
      "source": [
        "# 应用数据转换\n",
        "\n",
        "Dataset数据结构应用非常灵活，因为它本质上是一个Sequece序列，其每个元素可以是各种类型，例如可以是张量，列表，字典，也可以是Dataset。\n",
        "\n",
        "Dataset包含了非常丰富的数据转换功能。\n",
        "\n",
        "- map: 将转换函数映射到数据集每一个元素。\n",
        "- flat_map: 将转换函数映射到数据集的每一个元素，并将嵌套的Dataset压平。\n",
        "- interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。\n",
        "- filter: 过滤掉某些元素。\n",
        "- zip: 将两个长度相同的Dataset横向铰合。\n",
        "- concatenate: 将两个Dataset纵向连接。\n",
        "- reduce: 执行归并操作。\n",
        "- batch : 构建批次，每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。\n",
        "- padded_batch: 构建批次，类似batch, 但可以填充到相同的形状。\n",
        "- window :构建滑动窗口，返回Dataset of Dataset.\n",
        "- shuffle: 数据顺序洗牌。\n",
        "- repeat: 重复数据若干次，不带参数时，重复无数次。\n",
        "- shard: 采样，从某个位置开始隔固定距离采样一个元素。\n",
        "- take: 采样，从开始位置取前几个元素。"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "yBjy1VTb2yCn",
        "outputId": "ca5923ea-8aa5-4ed5-e76e-5c3b25fe74cb",
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 1000
        }
      },
      "source": [
        "# map:将转换函数映射到数据集每一个元素\n",
        "print(\"=====map=====\")\n",
        "ds = tf.data.Dataset.from_tensor_slices([\"hello world\",\"hello China\",\"hello Beijing\"])\n",
        "ds_map = ds.map(lambda x:tf.strings.split(x,\" \"))\n",
        "for x in ds_map:\n",
        "    tf.print(x)\n",
        "\n",
        "# flat_map:将转换函数映射到数据集的每一个元素，并将嵌套的Dataset压平。\n",
        "print(\"=====flat_map=====\")\n",
        "ds = tf.data.Dataset.from_tensor_slices([\"hello world\",\"hello China\",\"hello Beijing\"])\n",
        "ds_flatmap = ds.flat_map(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x,\" \")))\n",
        "for x in ds_flatmap:\n",
        "    tf.print(x)\n",
        "\n",
        "# interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。\n",
        "print(\"=====interleave=====\")\n",
        "ds = tf.data.Dataset.from_tensor_slices([\"hello world\",\"hello China\",\"hello Beijing\"])\n",
        "ds_interleave = ds.interleave(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x,\" \")))\n",
        "for x in ds_interleave:\n",
        "    tf.print(x)\n",
        "\n",
        "print(\"=====filter=====\")\n",
        "# filter:过滤掉某些元素。\n",
        "ds = tf.data.Dataset.from_tensor_slices([\"hello world\",\"hello China\",\"hello Beijing\"])\n",
        "#找出含有字母a或B的元素\n",
        "ds_filter = ds.filter(lambda x: tf.strings.regex_full_match(x, \".*[a|B].*\"))\n",
        "for x in ds_filter:\n",
        "    tf.print(x)\n",
        "\n",
        "print(\"=====zip=====\")\n",
        "# zip:将两个长度相同的Dataset横向铰合。\n",
        "ds1 = tf.data.Dataset.range(0,3)\n",
        "ds2 = tf.data.Dataset.range(3,6)\n",
        "ds3 = tf.data.Dataset.range(6,9)\n",
        "ds_zip = tf.data.Dataset.zip((ds1,ds2,ds3))\n",
        "for x,y,z in ds_zip:\n",
        "    print(x.numpy(),y.numpy(),z.numpy())\n",
        "\n",
        "print(\"=====condatenate=====\")\n",
        "# condatenate:将两个Dataset纵向连接。\n",
        "ds1 = tf.data.Dataset.range(0,3)\n",
        "ds2 = tf.data.Dataset.range(3,6)\n",
        "ds_concat = tf.data.Dataset.concatenate(ds1,ds2)\n",
        "for x in ds_concat:\n",
        "    print(x)\n",
        "\n",
        "print(\"=====reduce=====\")\n",
        "# reduce:执行归并操作。\n",
        "ds = tf.data.Dataset.from_tensor_slices([1,2,3,4,5.0])\n",
        "result = ds.reduce(0.0,lambda x,y:tf.add(x,y))\n",
        "tf.print(result)\n",
        "\n",
        "print(\"=====batch=====\")\n",
        "# batch:构建批次，每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。 \n",
        "ds = tf.data.Dataset.range(12)\n",
        "ds_batch = ds.batch(4)\n",
        "for x in ds_batch:\n",
        "    tf.print(x)\n",
        "\n",
        "print(\"=====padded_batch=====\")\n",
        "# padded_batch:构建批次，类似batch, 但可以填充到相同的形状。\n",
        "elements = [[1, 2],[3, 4, 5],[6, 7],[8]]\n",
        "ds = tf.data.Dataset.from_generator(lambda: iter(elements), tf.int32)\n",
        "\n",
        "ds_padded_batch = ds.padded_batch(2,padded_shapes = [4,])\n",
        "for x in ds_padded_batch:\n",
        "    print(x)    \n",
        "\n",
        "print(\"=====window=====\")\n",
        "# window:构建滑动窗口，返回Dataset of Dataset.\n",
        "ds = tf.data.Dataset.range(12)\n",
        "# window 返回的是 Dataset of Dataset,可以用 flat_map 压平\n",
        "ds_window = ds.window(3, shift=1).flat_map(lambda x: x.batch(3, drop_remainder=True)) \n",
        "for x in ds_window:\n",
        "    print(x)\n",
        "\n",
        "print(\"=====shuffle=====\")\n",
        "# shuffle:数据顺序洗牌。\n",
        "ds = tf.data.Dataset.range(12)\n",
        "ds_shuffle = ds.shuffle(buffer_size = 5)\n",
        "for x in ds_shuffle:\n",
        "    print(x)\n",
        "\n",
        "print(\"=====repeat=====\")\n",
        "# repeat:重复数据若干次，不带参数时，重复无数次。\n",
        "ds = tf.data.Dataset.range(3)\n",
        "ds_repeat = ds.repeat(3)\n",
        "for x in ds_repeat:\n",
        "    print(x)\n",
        "\n",
        "print(\"=====shard=====\")\n",
        "# shard:采样，从某个位置开始隔固定距离采样一个元素。\n",
        "ds = tf.data.Dataset.range(12)\n",
        "ds_shard = ds.shard(3,index = 1)\n",
        "\n",
        "for x in ds_shard:\n",
        "    print(x)\n",
        "\n",
        "print(\"=====take=====\")\n",
        "# take:采样，从开始位置取前几个元素。\n",
        "ds = tf.data.Dataset.range(12)\n",
        "ds_take = ds.take(3)\n",
        "\n",
        "list(ds_take.as_numpy_iterator())"
      ],
      "execution_count": 12,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "=====map=====\n",
            "[\"hello\" \"world\"]\n",
            "[\"hello\" \"China\"]\n",
            "[\"hello\" \"Beijing\"]\n",
            "=====flat_map=====\n",
            "hello\n",
            "world\n",
            "hello\n",
            "China\n",
            "hello\n",
            "Beijing\n",
            "=====interleave=====\n",
            "hello\n",
            "hello\n",
            "world\n",
            "China\n",
            "hello\n",
            "Beijing\n",
            "=====filter=====\n",
            "hello China\n",
            "hello Beijing\n",
            "=====zip=====\n",
            "0 3 6\n",
            "1 4 7\n",
            "2 5 8\n",
            "=====condatenate=====\n",
            "tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor(1, shape=(), dtype=int64)\n",
            "tf.Tensor(2, shape=(), dtype=int64)\n",
            "tf.Tensor(3, shape=(), dtype=int64)\n",
            "tf.Tensor(4, shape=(), dtype=int64)\n",
            "tf.Tensor(5, shape=(), dtype=int64)\n",
            "=====reduce=====\n",
            "15\n",
            "=====batch=====\n",
            "[0 1 2 3]\n",
            "[4 5 6 7]\n",
            "[8 9 10 11]\n",
            "=====padded_batch=====\n",
            "tf.Tensor(\n",
            "[[1 2 0 0]\n",
            " [3 4 5 0]], shape=(2, 4), dtype=int32)\n",
            "tf.Tensor(\n",
            "[[6 7 0 0]\n",
            " [8 0 0 0]], shape=(2, 4), dtype=int32)\n",
            "=====window=====\n",
            "tf.Tensor([0 1 2], shape=(3,), dtype=int64)\n",
            "tf.Tensor([1 2 3], shape=(3,), dtype=int64)\n",
            "tf.Tensor([2 3 4], shape=(3,), dtype=int64)\n",
            "tf.Tensor([3 4 5], shape=(3,), dtype=int64)\n",
            "tf.Tensor([4 5 6], shape=(3,), dtype=int64)\n",
            "tf.Tensor([5 6 7], shape=(3,), dtype=int64)\n",
            "tf.Tensor([6 7 8], shape=(3,), dtype=int64)\n",
            "tf.Tensor([7 8 9], shape=(3,), dtype=int64)\n",
            "tf.Tensor([ 8  9 10], shape=(3,), dtype=int64)\n",
            "tf.Tensor([ 9 10 11], shape=(3,), dtype=int64)\n",
            "=====shuffle=====\n",
            "tf.Tensor(4, shape=(), dtype=int64)\n",
            "tf.Tensor(5, shape=(), dtype=int64)\n",
            "tf.Tensor(1, shape=(), dtype=int64)\n",
            "tf.Tensor(3, shape=(), dtype=int64)\n",
            "tf.Tensor(7, shape=(), dtype=int64)\n",
            "tf.Tensor(2, shape=(), dtype=int64)\n",
            "tf.Tensor(10, shape=(), dtype=int64)\n",
            "tf.Tensor(6, shape=(), dtype=int64)\n",
            "tf.Tensor(8, shape=(), dtype=int64)\n",
            "tf.Tensor(11, shape=(), dtype=int64)\n",
            "tf.Tensor(9, shape=(), dtype=int64)\n",
            "tf.Tensor(0, shape=(), dtype=int64)\n",
            "=====repeat=====\n",
            "tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor(1, shape=(), dtype=int64)\n",
            "tf.Tensor(2, shape=(), dtype=int64)\n",
            "tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor(1, shape=(), dtype=int64)\n",
            "tf.Tensor(2, shape=(), dtype=int64)\n",
            "tf.Tensor(0, shape=(), dtype=int64)\n",
            "tf.Tensor(1, shape=(), dtype=int64)\n",
            "tf.Tensor(2, shape=(), dtype=int64)\n",
            "=====shard=====\n",
            "tf.Tensor(1, shape=(), dtype=int64)\n",
            "tf.Tensor(4, shape=(), dtype=int64)\n",
            "tf.Tensor(7, shape=(), dtype=int64)\n",
            "tf.Tensor(10, shape=(), dtype=int64)\n",
            "=====take=====\n"
          ],
          "name": "stdout"
        },
        {
          "output_type": "execute_result",
          "data": {
            "text/plain": [
              "[0, 1, 2]"
            ]
          },
          "metadata": {
            "tags": []
          },
          "execution_count": 12
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qTqFiaHR4zxW"
      },
      "source": [
        "# 提升管道性能\n",
        "\n",
        "训练深度学习模型常常会非常耗时。模型训练的耗时主要来自于两个部分，一部分来自数据准备，另一部分来自参数迭代。参数迭代过程的耗时通常依赖于GPU来提升。而数据准备过程的耗时则可以通过构建高效的数据管道进行提升。\n",
        "\n",
        "以下是一些构建高效数据管道的建议。\n",
        "\n",
        "- 使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。\n",
        "- 使用 interleave 方法可以让数据读取过程多进程执行,并将不同来源数据夹在一起。\n",
        "- 使用 map 时设置num_parallel_calls 让数据转换过程多进程执行。\n",
        "- 使用 cache 方法让数据在第一个epoch后缓存到内存中，仅限于数据集不大情形。\n",
        "- 使用 map转换时，先batch, 然后采用向量化的转换方法对每个batch进行转换。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5q-mgMMR6U16"
      },
      "source": [
        "## 使用 prefetch 方法让数据准备和参数迭代两个过程相互并行"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "ADLAEnnn6Txv",
        "outputId": "77976ba8-7721-4044-8dba-095cfa82cc7f",
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 153
        }
      },
      "source": [
        "import tensorflow as tf\n",
        "\n",
        "#打印时间分割线\n",
        "@tf.function\n",
        "def printbar():\n",
        "    ts = tf.timestamp()\n",
        "    today_ts = ts%(24*60*60)\n",
        "\n",
        "    hour = tf.cast(today_ts//3600+8,tf.int32)%tf.constant(24)\n",
        "    minite = tf.cast((today_ts%3600)//60,tf.int32)\n",
        "    second = tf.cast(tf.floor(today_ts%60),tf.int32)\n",
        "    \n",
        "    def timeformat(m):\n",
        "        if tf.strings.length(tf.strings.format(\"{}\",m))==1:\n",
        "            return(tf.strings.format(\"0{}\",m))\n",
        "        else:\n",
        "            return(tf.strings.format(\"{}\",m))\n",
        "    \n",
        "    timestring = tf.strings.join([timeformat(hour),timeformat(minite),\n",
        "                timeformat(second)],separator = \":\")\n",
        "    tf.print(\"==========\"*8,end = \"\")\n",
        "    tf.print(timestring)\n",
        "    \n",
        "import time\n",
        "\n",
        "# 数据准备和参数迭代两个过程默认情况下是串行的。\n",
        "\n",
        "# 模拟数据准备\n",
        "def generator():\n",
        "    for i in range(10):\n",
        "        # 假设每次准备数据需要2s\n",
        "        time.sleep(2) \n",
        "        yield i \n",
        "\n",
        "ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32))\n",
        "\n",
        "# 模拟参数迭代\n",
        "def train_step():\n",
        "    # 假设每一步训练需要 1s\n",
        "    time.sleep(1) \n",
        "    \n",
        "# 训练过程预计耗时 10*2+10*1 = 30s\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start training...\"))\n",
        "for x in ds:\n",
        "    train_step()  \n",
        "printbar()\n",
        "tf.print(tf.constant(\"end training...\"))\n",
        "\n",
        "# 使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。\n",
        "\n",
        "# 训练过程预计耗时 max(10*2,10*1) = 20s\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start training with prefetch...\"))\n",
        "\n",
        "# tf.data.experimental.AUTOTUNE 可以让程序自动选择合适的参数\n",
        "for x in ds.prefetch(buffer_size = tf.data.experimental.AUTOTUNE):\n",
        "    train_step()  \n",
        "    \n",
        "printbar()\n",
        "tf.print(tf.constant(\"end training...\"))\n"
      ],
      "execution_count": 13,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "================================================================================20:36:20\n",
            "start training...\n",
            "================================================================================20:36:50\n",
            "end training...\n",
            "================================================================================20:36:50\n",
            "start training with prefetch...\n",
            "================================================================================20:37:11\n",
            "end training...\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dIztAR6O6qBW"
      },
      "source": [
        "## 使用 interleave 方法可以让数据读取过程多进程执行,并将不同来源数据夹在一起"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "NH6vsR6z6rkh"
      },
      "source": [
        "ds_files = tf.data.Dataset.list_files(\"./data/titanic/*.csv\")\n",
        "ds = ds_files.flat_map(lambda x:tf.data.TextLineDataset(x).skip(1))\n",
        "for line in ds.take(4):\n",
        "    print(line)\n",
        "\n",
        "ds_files = tf.data.Dataset.list_files(\"./data/titanic/*.csv\")\n",
        "ds = ds_files.interleave(lambda x:tf.data.TextLineDataset(x).skip(1))\n",
        "for line in ds.take(8):\n",
        "    print(line)\n"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "q4yaq-6l67GG"
      },
      "source": [
        "## 使用 map 时设置num_parallel_calls 让数据转换过程多进行执行"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "xhQ6wvZB7AQX"
      },
      "source": [
        "ds = tf.data.Dataset.list_files(\"./data/cifar2/train/*/*.jpg\")\n",
        "def load_image(img_path,size = (32,32)):\n",
        "    label = 1 if tf.strings.regex_full_match(img_path,\".*/automobile/.*\") else 0\n",
        "    img = tf.io.read_file(img_path)\n",
        "    img = tf.image.decode_jpeg(img) #注意此处为jpeg格式\n",
        "    img = tf.image.resize(img,size)\n",
        "    return(img,label)\n",
        "#单进程转换\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start transformation...\"))\n",
        "\n",
        "ds_map = ds.map(load_image)\n",
        "for _ in ds_map:\n",
        "    pass\n",
        "\n",
        "printbar()\n",
        "tf.print(tf.constant(\"end transformation...\"))\n",
        "#多进程转换\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start parallel transformation...\"))\n",
        "\n",
        "ds_map_parallel = ds.map(load_image,num_parallel_calls = tf.data.experimental.AUTOTUNE)\n",
        "for _ in ds_map_parallel:\n",
        "    pass\n",
        "\n",
        "printbar()\n",
        "tf.print(tf.constant(\"end parallel transformation...\"))"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "9TD28bsz7Cpk"
      },
      "source": [
        "## 使用 cache 方法让数据在第一个epoch后缓存到内存中，仅限于数据集不大情形。\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "lz3i4q8h7GY_",
        "outputId": "9165ef7c-5fe9-474a-d1bd-ae40e268e3dc",
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 357
        }
      },
      "source": [
        "import time\n",
        "\n",
        "# 模拟数据准备\n",
        "def generator():\n",
        "    for i in range(5):\n",
        "        #假设每次准备数据需要2s\n",
        "        time.sleep(2) \n",
        "        yield i \n",
        "ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32))\n",
        "\n",
        "# 模拟参数迭代\n",
        "def train_step():\n",
        "    #假设每一步训练需要0s\n",
        "    pass\n",
        "\n",
        "# 训练过程预计耗时 (5*2+5*0)*3 = 30s\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start training...\"))\n",
        "for epoch in tf.range(3):\n",
        "    for x in ds:\n",
        "        train_step()  \n",
        "    printbar()\n",
        "    tf.print(\"epoch =\",epoch,\" ended\")\n",
        "printbar()\n",
        "tf.print(tf.constant(\"end training...\"))\n",
        "import time\n",
        "\n",
        "# 模拟数据准备\n",
        "def generator():\n",
        "    for i in range(5):\n",
        "        #假设每次准备数据需要2s\n",
        "        time.sleep(2) \n",
        "        yield i \n",
        "\n",
        "# 使用 cache 方法让数据在第一个epoch后缓存到内存中，仅限于数据集不大情形。\n",
        "ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32)).cache()\n",
        "\n",
        "# 模拟参数迭代\n",
        "def train_step():\n",
        "    #假设每一步训练需要0s\n",
        "    time.sleep(0) \n",
        "\n",
        "# 训练过程预计耗时 (5*2+5*0)+(5*0+5*0)*2 = 10s\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start training...\"))\n",
        "for epoch in tf.range(3):\n",
        "    for x in ds:\n",
        "        train_step()  \n",
        "    printbar()\n",
        "    tf.print(\"epoch =\",epoch,\" ended\")\n",
        "printbar()\n",
        "tf.print(tf.constant(\"end training...\"))"
      ],
      "execution_count": 14,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "================================================================================20:39:26\n",
            "start training...\n",
            "================================================================================20:39:36\n",
            "epoch = 0  ended\n",
            "================================================================================20:39:46\n",
            "epoch = 1  ended\n",
            "================================================================================20:39:56\n",
            "epoch = 2  ended\n",
            "================================================================================20:39:56\n",
            "end training...\n",
            "================================================================================20:39:56\n",
            "start training...\n",
            "================================================================================20:40:06\n",
            "epoch = 0  ended\n",
            "================================================================================20:40:06\n",
            "epoch = 1  ended\n",
            "================================================================================20:40:06\n",
            "epoch = 2  ended\n",
            "================================================================================20:40:06\n",
            "end training...\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "NfPtrcpY7H0v"
      },
      "source": [
        "## 使用 map转换时，先batch, 然后采用向量化的转换方法对每个batch进行转换"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "th7jpI2_7LmP"
      },
      "source": [
        "# 先map后batch\n",
        "ds = tf.data.Dataset.range(100000)\n",
        "ds_map_batch = ds.map(lambda x:x**2).batch(20)\n",
        "\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start scalar transformation...\"))\n",
        "for x in ds_map_batch:\n",
        "    pass\n",
        "printbar()\n",
        "tf.print(tf.constant(\"end scalar transformation...\"))\n",
        "\n",
        "# 先batch 后 map\n",
        "ds = tf.data.Dataset.range(100000)\n",
        "ds_batch_map = ds.batch(20).map(lambda x:x**2)\n",
        "\n",
        "printbar()\n",
        "tf.print(tf.constant(\"start vector transformation...\"))\n",
        "for x in ds_batch_map:\n",
        "    pass\n",
        "printbar()\n",
        "tf.print(tf.constant(\"end vector transformation...\"))"
      ],
      "execution_count": null,
      "outputs": []
    }
  ]
}